离线处理和实时处理 | 您所在的位置:网站首页 › 实时 离线 › 离线处理和实时处理 |
大数据实战项目(1)-项目简介、开发技术、工具、架构等 大数据实战项目(2)-数据采集、处理、分发流程所涉及到的框架及配置 这一部分主要是对数据进行离线处理和实时处理的总结。 离线数据处理 MySQL+HiveMySQL一方面用来存储Hive的元数据,另一方面存储离线分析的结果。 1)MySQL的安装 2)Hive的安装 #hive-log4j.properties #日志目录需要提前创建 property.hive.log.dir = /opt/modules/hive-2.1.0/logs #修改hive-env.sh配置文件 #Set HADOOP_HOME to point to a specific hadoop install directory HADOOP_HOME=/opt/modules/hadoop-2.6.0 HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0 # Hive Configuration Directory can be controlled by: export HIVE_CONF_DIR=/opt/modules/hive-2.1.0/conf # Folder containing extra ibraries required for hive compilation/execution can be controlled by: export HIVE_AUX_JARS_PATH=/opt/modules/hive-2.1.0/lib3)Hive与MySQL集成 创建hive-site.xml文件,配置mysql元数据库metastore javax.jdo.option.ConnectionURL jdbc:mysql://bigdata-pro01.bigDAta.com/metastore?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword 123456 hive.cli.print.header true hive.cli.print.current.db true hbase.zookeeper.quorum bigdata-pro01.bigDAta.com,bigdata-pro02.bigDAta.com,bigdata-pro03.bigDAta.com 在MySQL数据中设置用户连接信息,可以无阻碍访问mysql数据库,其次要保证Hive所在节点能无密钥登录其他集群内节点4)Hive与MySQL集成测试 启动HDFS和YARN服务启动Hive通过Hive服务创建表,并向这个表中加载数据,在Hive查看表中内容在MySQL数据库metastore中查看元数据 Hive+HBase集成Hive是一个数据仓库,主要是转为MapReduce完成对大量数据的离线分析和决策,之前完成了Flume集成HBase,此时HBase中能源源不断地插入数据,那么如何使Hive中也有数据呢?使用外部表进行Hive与HBase的关联 hbase.zookeeper.quorum bigdata-pro01.bigDAta.com,bigdata-pro02.bigDAta.com,bigdata-pro03.bigDAta.com将 HBase中的部分jar包拷贝到Hive中,如果两者都是CDH版本,就不需要进行拷贝;若hive安装时自带了以下jar包,将其删除。使用软连接的方式 export HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0 export HIVE_HOME=/opt/modules/hive-2.1.0 ln -s $HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-protocol-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-it-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-it-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/htrace-core-3.0.4.jar $HIVE_HOME/lib/htrace-core-3.0.4.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HIVE_HOME/lib/high-scale-lib-1.1.1.jar ln -s $HBASE_HOME/lib/hbase-common-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-common-1.0.0-cdh5.4.0.jar在Hive中创建一个与HBase中的表建立关联的外部表 create external table weblogs( id string, datatime string, userid string, searchname string, retorder string, cliorder string, cliurl string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl") TBLPROPERTIES("hbase.table.name" = "weblogs");可通过在Hive与HBase中输入count ‘weblogs’,查看数据是否同步。 Cloudera Hue可视化分析1)下载、安装及编译 详细过程,记录在Hadoop可视化神器-Hue安装、编译、运行 2)基本配置 1.配置desktop/conf/hue.ini 2.修改desktop.db文件权限3)集成 具体内容参考:Hue集成HDFS、YARN、Hive、MySql、HBase的相关配置,此处仅是流程。 与HDFS集成 与YARN集成 与Hive集成 与MySQL集成 与HBase集成 实时数据处理 Spark与Kafka集成1)Spark下载 安装与编译 2)Structured Streaming 与Kafka集成 将kafka_2.11-0.10.0.0.jar kafka-clients-0.10.0.0.jar spark-sql-kafka-0-10_2.11-2.2.0.jar spark-streaming-kafka-0-10_2.11-2.1.0.jar等包添加到spark下的jars目录下 在IDEA中编写如下代码,Structured Streaming从kafka中读取数据,并进行计算 val spark = SparkSession.builder() .master("local[2]") .appName("streaming").getOrCreate() val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "bigdata-pro01.bigDAta.com:9092,bigdata-pro02.bigDAta.com:9092,bigdata-pro03.bigDAta.com:9092") .option("subscribe", "weblogs") .load() import spark.implicits._ val lines = df.selectExpr("CAST(value AS STRING)").as[String] val weblog = lines.map(_.split(",")) .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5))) val titleCount = weblog .groupBy("searchname").count().toDF("titleName","count") Spark与MySQL集成由于这里仅仅需要对报表进行展示,前台展示的字段并不多,MySQL完全可以支撑。在HBase中有几百万条数据( 一个浏览话题可能有十几万人搜索过,也就是说一个话题就有十几万条数据,这么大量数据当然要存在Hbase中 ),而经过Spark的计算, 这十几万条数据在mysql中就变成了一条数据(titleName,count)。 如果需要实时查询用户各种信息(数据量很大,字段很多),那么就需要实时的直接从Hbase里查,而不会在Mysql中。 val url ="jdbc:mysql://bigdata-pro01.bigDAta.com:3306/test" val username="root" val password="123456" val writer = new JDBCSink(url,username,password) val query = titleCount.writeStream .foreach(writer) .outputMode("update") .trigger(ProcessingTime("5 seconds")) .start() query.awaitTermination()其中的JDBCSink具体代码如下所示: import java.sql._ import org.apache.spark.sql.{ForeachWriter, Row} class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{ //var是一个变量 //val常量 var statement : Statement =_ var resultSet : ResultSet =_ var connection : Connection=_ override def open(partitionId: Long, version: Long): Boolean = { connection = new MySqlPool(url,username,password).getJdbcConn(); statement = connection.createStatement() return true } //处理数据 override def process(value: Row): Unit = { // 将titleName中的[[]]用空格代替。标记一个中括号表达式的开始。要匹配 [,请使用 \[ val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","") val count = value.getAs[Long]("count"); val querySql = "select 1 from webCount " + "where titleName = '"+titleName+"'" val updateSql = "update webCount set " + "count = "+count+" where titleName = '"+titleName+"'" val insertSql = "insert into webCount(titleName,count)" + "values('"+titleName+"',"+count+")" try{ var resultSet = statement.executeQuery(querySql) if(resultSet.next()){ //如果有执行updateSql statement.executeUpdate(updateSql) }else{ //没有的话就执行insertSql statement.execute(insertSql) } }catch { case ex: SQLException => { println("SQLException") } case ex: Exception => { println("Exception") } case ex: RuntimeException => { println("RuntimeException") } case ex: Throwable => { println("Throwable") } } } override def close(errorOrNull: Throwable): Unit = { if(statement==null){ statement.close() } if(connection==null){ connection.close() } } }而在JDBCSink中用到的MySqlPool连接池的具体代码如下所示 import java.sql.{Connection, DriverManager} import java.util class MySqlPool(url:String, user:String, pwd:String) extends Serializable{ private val max = 3 //连接池连接总数 private val connectionNum = 1 //每次产生连接数 private var conNum = 0 //当前连接池已产生的连接数 private val pool = new util.LinkedList[Connection]() //连接池 //获取连接 def getJdbcConn() : Connection = { //同步代码块 AnyRef.synchronized({ if(pool.isEmpty){ //加载驱动 preGetConn() for(i |
CopyRight 2018-2019 实验室设备网 版权所有 |